{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<center>\n",
    "<img src=\"../../img/ods_stickers.jpg\" />\n",
    "    \n",
    "## [mlcourse.ai](https://mlcourse.ai) – Open Machine Learning Course \n",
    "### <center> Author: Irina Knyazeva, ODS Slack nickname : iknyazeva\n",
    "    \n",
    "## <center> Tutorial\n",
    "### <center> \"HANDLE DIFFERENT DATASET WITH DASK AND TRYING A LITTLE DASK ML\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## WHY DO I NEED DASK?\n",
    "\n",
    "Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.\n",
    "\n",
    "## YOU DEFINITELY NEED DASK  IF\n",
    "if problem size close to limits of RAM, but fits to disk\n",
    "\n",
    "\n",
    "## Reading list\n",
    "This notebook based mainly  based on this three sources\n",
    "\n",
    "- [One more tutorial from analytics vidhya](https://www.analyticsvidhya.com/blog/2018/08/dask-big-datasets-machine_learning-python/)\n",
    "- [taken from towardsdatascience](https://towardsdatascience.com/trying-out-dask-dataframes-in-python-for-fast-data-analysis-in-parallel-aa960c18a915)\n",
    "\n",
    "- [DataCamp course](https://campus.datacamp.com/courses/parallel-computing-with-dask/)\n",
    "\n",
    "- [Dask documentation](https://docs.dask.org/en/latest/) \n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import gc\n",
    "import os\n",
    "import time\n",
    "import warnings\n",
    "\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import psutil\n",
    "from dask import delayed\n",
    "\n",
    "warnings.filterwarnings(\"ignore\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's  write a little function for tracking memory that takes python process"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def memory_footprint():\n",
    "    mem = psutil.Process(os.getpid()).memory_info().rss\n",
    "    return mem / 1024 ** 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "before = memory_footprint()\n",
    "print(f\"Memory used before is {round(before,2)} MB\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "N = (1024 ** 2) // 8\n",
    "x = np.random.randn(50 * N)\n",
    "after = memory_footprint()\n",
    "print(f\"Memory used after is {round(after,2)} MB\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Computes, but doesn't bind result to a variable allocate extra memory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x ** 2\n",
    "after1 = memory_footprint()\n",
    "print(f\" Extra memory obtained after computation {round(after1,2)} MB\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dask arrays\n",
    "\n",
    "Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.[dask array documentation](http://docs.dask.org/en/latest/array.html)\n",
    "\n",
    "\n",
    "\n",
    "<center>\n",
    "<img src=\"http://docs.dask.org/en/latest/_images/dask-array-black-text.svg\" />\n",
    "In dask there is three main structures: dask array (based on numpy array), dask dataframe (based on pandas dataframe)  and dask bags (for unstructured data as text)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.array as da\n",
    "\n",
    "y = da.from_array(x, chunks=len(x) // 4)\n",
    "print(\"Dask arrays require little memory:\", memory_footprint() - after1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "t_start = time.time()\n",
    "x.mean()\n",
    "t_end = time.time()\n",
    "print(\"Compute mean value of this numpy array \\n\")\n",
    "print(\n",
    "    \"Elapsed time for compute mean of numpy array (ms):\",\n",
    "    round((t_end - t_start) * 1000),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "t_start = time.time()\n",
    "y.mean().compute()\n",
    "t_end = time.time()\n",
    "print(\"Compute the same with dask \\n\")\n",
    "print(\n",
    "    \"Elapsed time for compute mean of dask array (ms):\", round((t_end - t_start) * 1000)\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Actually, this example will never be used in practice, because if your numpy already in memory, any partitioning will always raise computational time. But if you need to process data from HDF5, NetCDF or bulk of numpy files from disk it could be extremely useful"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Delayed operations with dask\n",
    "\n",
    "But dask could be useful for small data with delayed computation. It could easily parallelize computation. Let's see the example with our previous numpy array   "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def f(z):\n",
    "    return np.sqrt(z + 4)\n",
    "\n",
    "\n",
    "def g(y):\n",
    "    return y - 3\n",
    "\n",
    "\n",
    "def h(x):\n",
    "    return x ** 2\n",
    "\n",
    "\n",
    "time_start = time.time()\n",
    "x = np.random.randn(50 * N)\n",
    "y = h(x)\n",
    "z = g(x)\n",
    "w = f(z + y)\n",
    "time_end = time.time()\n",
    "print(\n",
    "    \"Elapsed time for compute complex functions with numpy array (ms):\",\n",
    "    round((time_end - time_start) * 1000),\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "y = delayed(h)(x)\n",
    "z = delayed(g)(x)\n",
    "w = delayed(f)(z + y)\n",
    "print(\"After we get dask delayed object\", w)\n",
    "time_start = time.time()\n",
    "w.compute()\n",
    "time_end = time.time()\n",
    "print(\n",
    "    \"Elapsed time for compute complex functions with numpy array with dask delayed (ms):\",\n",
    "    round((time_end - time_start) * 1000),\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is easily understood why computation time decreased with the computational graph. Let's do this with the second way of introducing delay functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@delayed\n",
    "def f(z):\n",
    "    return np.sqrt(z + 4)\n",
    "\n",
    "\n",
    "@delayed\n",
    "def g(y):\n",
    "    return y - 3\n",
    "\n",
    "\n",
    "@delayed\n",
    "def h(x):\n",
    "    return x ** 2\n",
    "\n",
    "\n",
    "y = h(x)\n",
    "z = g(x)\n",
    "w = f(z + y)\n",
    "w.visualize()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dask dataframe \n",
    "\n",
    "Dask DataFrames coordinate many Pandas DataFrames/Series arranged along the index. A Dask DataFrame is partitioned row-wise, grouping rows by index value for efficiency. These Pandas objects may live on disk or on other machines.\n",
    "(See documentation)[http://docs.dask.org/en/latest/dataframe.html]\n",
    "\n",
    "<center>\n",
    "<img src=\"http://docs.dask.org/en/latest/_images/dask-dataframe.svg\" width=\"40%\" height=\"40% />\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here we use the file `athlete_events.csv` from [this Kaggle Dataset](https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.dataframe as dd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Let's return to start of our ML journey\\n\")\n",
    "print(\"Load olympic dataset \\n\")\n",
    "PATH = \"../../data/athlete_events.csv\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = pd.read_csv(PATH)\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "m1 = memory_footprint()\n",
    "dask_df = dd.read_csv(PATH)\n",
    "m2 = memory_footprint()\n",
    "print(\"Dask do not allocate memory after creation:\", m2 - m1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"But we could see data as in pandas dataframe:\")\n",
    "dask_df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# building delayed  computation\n",
    "print(\n",
    "    \"We can do many operation the same way as in pandas, but without loading all data in memory \\n \"\n",
    ")\n",
    "sex_distr = (\n",
    "    dask_df.loc[dask_df[\"Games\"].str.contains(\"1996\")].groupby(\"Sex\")[\"Age\"].min()\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\n",
    "    \"Here we done selecting and aggregation exactly the same way as we did in pandas \\n\"\n",
    ")\n",
    "print(\"But there is not any computation, we create dask structure \\ n\")\n",
    "sex_distr"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "print(\n",
    "    \"Computation is time consuming, but we remember that we dont't need to load all data in memory for this computation \\n\"\n",
    ")\n",
    "print(sex_distr.compute())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "print(\"Pandas of course more effective \\n\")\n",
    "print(df.loc[df[\"Games\"].str.contains(\"1996\")].groupby(\"Sex\")[\"Age\"].min())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compatibility with Pandas API\n",
    "- Unavailable in dask.dataframe:\n",
    "    * some unsupported file formats (e.g., .xls, .zip,...)\n",
    "    * sorting\n",
    "- Available in dask.dataframe:\n",
    "    * indexing, selection, & reindexing\n",
    "    * aggregations: .sum(), .mean(), .std(), .min(), .max() etc.\n",
    "    * grouping with .groupby()\n",
    "    * datetime conversion with dd.to_datetime()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read collections of files to dask dataframe\n",
    "For example I've taken Alica Project.  Capstone_user_identification archive [link](https://drive.google.com/open?id=1AU3M_mFPofbfhFQa_Bktozq_vFREkWJA) (~7 Mb, unziped data ~60 Mb).\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "PATH_TO_DATA = \"../../data/capstone_user_identification\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"We can load all files in single dataframe \\n\")\n",
    "print(\"Your dont't need this in Alica project, just an example \\n \")\n",
    "user10dask = dd.read_csv(os.path.join(PATH_TO_DATA, \"10users/*.csv\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"We can look at the data\")\n",
    "print(user10dask)\n",
    "user10dask.tail()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\n",
    "    \"Let's see what happens if we want to count all sites (it could seen as a one more way for dictionary creation) \\n\"\n",
    ")\n",
    "count_sites = user10dask.groupby(\"site\")[\"site\"].count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"If we visualize this structure we'll see the picture of computation \\n\")\n",
    "count_sites.visualize()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "count_sites.compute().sort_values(ascending=False)[:20]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## JSON Files into Dask Bags\n",
    "\n",
    "Dask Bag implements operations like map, filter, fold, and groupby on collections of Python objects. It does this in parallel with a small memory footprint using Python iterators. It is similar to a parallel version of PyToolz or a Pythonic version of the PySpark RDD.[Dask bag documentation](http://docs.dask.org/en/latest/bag-overview.html)\n",
    "\n",
    "\n",
    "Dask bags are often used to parallelize simple computations on unstructured or semi-structured data like text data, log files, JSON records, or user defined Python objects.\n",
    " \n",
    "Let's see example with our Medium data from [this competition](https://www.kaggle.com/c/how-good-is-your-medium-article/data).\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import json\n",
    "\n",
    "import dask.bag as db"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Path to our medium data \\n\")\n",
    "PATH = \"../../data/kaggle_medium\"\n",
    "print(PATH)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"Wrap train json to dask bag format \\n\")\n",
    "items = db.read_text(os.path.join(PATH, \"train.json\"))\n",
    "items"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "print(\"Let's look at one example \\n\")\n",
    "print(items.take(1))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"We can parse date with json library and get dict like object \\n\")\n",
    "dict_items = items.map(json.loads)\n",
    "print(type(dict_items))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dict_items.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"We can take any key from all records \\n\")\n",
    "title_bag = dict_items.pluck(\"title\")\n",
    "print(\"With take method we received tuple of objects \\n\")\n",
    "print(title_bag.take(3))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can write any function for processing data and apply it with map function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def clean_title(text):\n",
    "\n",
    "    import string\n",
    "\n",
    "    cut_set = set(string.punctuation)\n",
    "    cut_set.update([\"”\", \"—\", \"…\", \"“\", \"⌘\", \"❤\", \"+\", \"®\", \"➜\", \"¬\", \"–\"])\n",
    "    text = text.translate(text.maketrans(\"\".join(cut_set), \" \" * len(cut_set)))\n",
    "    text = text.lower()\n",
    "    return text"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "title_bag = dict_items.pluck(\"title\").map(clean_title)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "title_bag.take(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Process  meta_tags"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "meta_tags_bag = dict_items.pluck(\"meta_tags\")\n",
    "test_meta = meta_tags_bag.take(3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "test_meta[1]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def clean_meta_tags(meta):\n",
    "    author = meta[\"author\"].strip()\n",
    "    min_reads = int(meta[\"twitter:data1\"].split()[0])\n",
    "    return {\"author\": author, \"min_reads\": min_reads}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "meta_tags_bag = meta_tags_bag.map(clean_meta_tags)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "meta_tags_bag.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Combine all together"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "# content_bag = dict_items.pluck('content').map(clean_content)\n",
    "title_bag = dict_items.pluck(\"title\").map(clean_title)\n",
    "published_bag = dict_items.pluck(\"published\").map(lambda x: x[\"$date\"])\n",
    "meta_bag = dict_items.pluck(\"meta_tags\").map(clean_meta_tags)\n",
    "domain_bag = dict_items.pluck(\"domain\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@delayed\n",
    "def combine_to_df(list_dict):\n",
    "\n",
    "    list_df = [pd.DataFrame(dict_) for dict_ in list_dict]\n",
    "    return pd.concat(list_df, axis=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "combined = combine_to_df([published_bag, meta_bag, domain_bag])\n",
    "combined.visualize()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# It takes time, around a minute\n",
    "from dask.diagnostics import ProgressBar\n",
    "\n",
    "with ProgressBar():\n",
    "    df = combined.compute()\n",
    "df.columns = [\"published\", \"Author\", \"min_reads\", \"domain\"]\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"We can create dask dataframe from pandas \\n\")\n",
    "dd_no_content = dd.from_pandas(df, npartitions=4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dd_no_content"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "print(\n",
    "    \"Transform published column to datetime as we did with pandas, it will by slightly slowly than in pandas \\n\"\n",
    ")\n",
    "df[\"published\"] = pd.to_datetime(df.published, format=\"%Y-%m-%dT%H:%M:%S.%fZ\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "print(\"Transform published column to datetime  with pandas, \\n\")\n",
    "dd_no_content[\"published\"] = dd.to_datetime(\n",
    "    dd_no_content.published, format=\"%Y-%m-%dT%H:%M:%S.%fZ\"\n",
    ").compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dd_no_content.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\n",
    "    \"We can apply function with mixed transformation to dask dataframe written for pandas df without changes \\n\"\n",
    ")\n",
    "\n",
    "\n",
    "def additional_time_features_df(\n",
    "    df, to_cat_cols=[\"Author\", \"domain\", \"month\", \"year\", \"day_of_week\"]\n",
    "):\n",
    "\n",
    "    df[\"month\"] = df[\"published\"].apply(lambda ts: ts.month)\n",
    "    df[\"year\"] = df[\"published\"].apply(lambda ts: ts.year)\n",
    "    hour = df[\"published\"].apply(lambda ts: ts.hour)\n",
    "    df[\"hour\"] = hour\n",
    "    df[\"morning\"] = ((hour >= 7) & (hour <= 11)).astype(\"float64\")\n",
    "    df[\"day\"] = ((hour >= 12) & (hour <= 18)).astype(\"int\")\n",
    "    df[\"evening\"] = ((hour >= 19) & (hour <= 23)).astype(\"int\")\n",
    "    df[\"night\"] = ((hour >= 0) & (hour <= 6)).astype(\"int\")\n",
    "    df[\"sin_hour\"] = np.sin(2 * np.pi * df[\"hour\"] / 24)\n",
    "    df[\"cos_hour\"] = np.cos(2 * np.pi * df[\"hour\"] / 24)\n",
    "    df = df.drop([\"hour\"], axis=1)\n",
    "    day_of_week = df[\"published\"].dt.dayofweek.astype(\"int\")\n",
    "    df[\"day_of_week\"] = day_of_week\n",
    "    df[\"weekend\"] = (day_of_week >= 5).astype(\"int\")\n",
    "    # turn to categorical\n",
    "    df[to_cat_cols] = df[to_cat_cols].astype(\"category\")\n",
    "\n",
    "    return df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "df_medium_train = additional_time_features_df(df.copy())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dd_medium_train = additional_time_features_df(dd_no_content)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "dd_medium_train.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dask ML\n",
    "\n",
    "Dask ML provides scalable machine learning algorithms in python which are compatible with scikit-learn. Let us first understand how scikit-learn handles the computations and then we will look at how Dask performs these operations differently. See dask-ml tutorials: [Examples from dask ml](http://ml.dask.org/examples.html)\n",
    "\n",
    "You need to install dask-ml at first \n",
    "\n",
    "There are two main parts in dask ml:\n",
    "    - approaches to handle big datasets \n",
    "    - approaches to handle big models"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Handle big model with dask distributed\n",
    "The biggest model from our course was a random forest on text data in the week with Random Forest assignment. Below I just reproduce part of our assignment, but  I reduced nrows and max features in Count vectorizer, but you can check with original parameters.\n",
    "\n",
    "Here we use the [`movie_reviews_train.csv`](https://drive.google.com/file/d/1WDz3EB0MMuQUuUTwZ30c4JJrN8d9shAW/view?usp=sharing) file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Download data\n",
    "df = pd.read_csv(\"../../data/movie_reviews_train.csv\", nrows=5000)\n",
    "\n",
    "# Split data to train and test\n",
    "X_text = df[\"text\"]\n",
    "y_text = df[\"label\"]\n",
    "\n",
    "# Classes counts\n",
    "df.label.value_counts()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.feature_extraction.text import CountVectorizer\n",
    "from sklearn.linear_model import LogisticRegression\n",
    "from sklearn.model_selection import GridSearchCV, StratifiedKFold\n",
    "from sklearn.pipeline import Pipeline\n",
    "\n",
    "# Split on 3 folds\n",
    "skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=17)\n",
    "\n",
    "# In Pipeline we will modify the text and train logistic regression\n",
    "classifier = Pipeline(\n",
    "    [\n",
    "        (\"vectorizer\", CountVectorizer(max_features=500, ngram_range=(1, 3))),\n",
    "        (\"clf\", LogisticRegression(random_state=17)),\n",
    "    ]\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "parameters = {\"clf__C\": (0.1, 1, 10, 100)}\n",
    "grid_search = GridSearchCV(classifier, parameters, scoring=\"roc_auc\", cv=skf)\n",
    "grid_search = grid_search.fit(X_text, y_text)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "grid_search.best_score_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Replace joblib with dask\n",
    "\n",
    "In this approach all we need to do is replace joblib to dask distributed. We need to initialize distributed client, and change backend"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "%%time\n",
    "from sklearn.externals import joblib\n",
    "\n",
    "client = Client()\n",
    "parameters = {\"clf__C\": (0.1, 1, 10, 100)}\n",
    "grid_search = GridSearchCV(classifier, parameters, scoring=\"roc_auc\", cv=skf)\n",
    "\n",
    "t_start = time.time()\n",
    "\n",
    "with joblib.parallel_backend(\"dask\"):\n",
    "    grid_search.fit(X_text, y_text)\n",
    "t_end = time.time()\n",
    "print(\"Elapsed time for grid_search with joblib replace (s):\", round((t_end - t_start)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "grid_search.best_score_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Replace Grid search with dask\n",
    "Parallel to Gridsearch CV in sklearn, Dask provides a library called Dask-search CV (Dask-search CV is now included in Dask ML). It merges steps so that there are less repetitions. Below are the installation steps for Dask-search. We need to install it separately"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# pip3 install dask-searchcv\n",
    "import dask_searchcv as dcv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can use a pipelines in dask grid search, and according the documentation we should use dask with pipelines with many opeations which could be parallelized, especially included feature union, but I've tried and get an error as a result... Anyway time consuming operations as CountVectorizer couldn't be parallelized, so here gridsearch from dask  only for classifier [documentation](https://dask-searchcv.readthedocs.io/en/latest/). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "vect = CountVectorizer(max_features=500, ngram_range=(1, 3))\n",
    "Xvect = vect.fit_transform(X_text)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lr = LogisticRegression()\n",
    "parameters = {\"C\": (0.1, 1, 10, 100)}\n",
    "t_start = time.time()\n",
    "grid_search = dcv.GridSearchCV(lr, parameters, scoring=\"roc_auc\", cv=skf)\n",
    "grid_search.fit(Xvect, y_text)\n",
    "t_end = time.time()\n",
    "print(\n",
    "    f\"Elapsed time for grid_search (without time spended to vectorization) {round((t_end - t_start))} (s):\"\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "grid_search.best_score_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "I tried to see how good dask will be with random forest with original parameters, but sometimes this raise en error get \"(OSError: [Errno 24] Too many open files) after execution, and I couldn't fix it....\"  Sometimes it works ok, for small data it works in most cases, but if you re-run this notebook several times there is a big chance to get such an error. So, I believe that dask-ml very usefull, but for know I definitely don't know how it should be used properly. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sklearn.ensemble import RandomForestClassifier\n",
    "\n",
    "rf = RandomForestClassifier(random_state=17)\n",
    "min_samples_leaf = [1, 2, 3]\n",
    "max_features = [0.3, 0.5, 0.7]\n",
    "max_depth = [None]\n",
    "\n",
    "parameters = {\n",
    "    \"max_features\": max_features,\n",
    "    \"min_samples_leaf\": min_samples_leaf,\n",
    "    \"max_depth\": max_depth,\n",
    "}\n",
    "grid_search = dcv.GridSearchCV(rf, parameters, scoring=\"roc_auc\", cv=skf)\n",
    "t_start = time.time()\n",
    "grid_search.fit(Xvect, y_text)\n",
    "t_end = time.time()\n",
    "print(\n",
    "    f\"Elapsed time for dask grid_search for Random Forest {round((t_end - t_start))} (s):\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Handle model with big data\n",
    "There are number of models rewritten in dask, which could take dask object (huge arrays) and compute models on them. You could read more in dask documentation. Below an example with KMeans, but also there are dask version of linear models, processing functions. The notation is very similar to scikit-learn, and it should be easy to use. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask_ml import datasets\n",
    "from dask_ml.cluster import KMeans"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X, y = datasets.make_blobs(\n",
    "    n_samples=10000000, chunks=1000000, random_state=0, centers=3\n",
    ")\n",
    "# Persist will give you back a lazy dask.delayed object\n",
    "X = X.persist()\n",
    "X"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "km = KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)\n",
    "km.fit(X)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Actually I read the article about dask couple of days ago and I've decided that task with tutorial a good way to get acquainted with the library. So I ask  you not to be very strict if  I misunderstood something:))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
